/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.lucene.tests.index;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.ToIntFunction;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterMergePolicy;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergePolicy.MergeContext;
import org.apache.lucene.index.MergePolicy.MergeSpecification;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.MergeTrigger;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.internal.tests.IndexWriterAccess;
import org.apache.lucene.internal.tests.TestSecrets;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.tests.analysis.MockAnalyzer;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.NullInfoStream;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.Version;

/** Base test case for {@link MergePolicy}. */
public abstract class BaseMergePolicyTestCase extends LuceneTestCase {

  private static final IndexWriterAccess INDEX_WRITER_ACCESS = TestSecrets.getIndexWriterAccess();

  /** Create a new {@link MergePolicy} instance. */
  protected abstract MergePolicy mergePolicy();

  /**
   * Assert that the given segment infos match expectations of the merge policy, assuming segments
   * that have only been either flushed or merged with this merge policy.
   */
  protected abstract void assertSegmentInfos(MergePolicy policy, SegmentInfos infos)
      throws IOException;

  /** Assert that the given merge matches expectations of the merge policy. */
  protected abstract void assertMerge(MergePolicy policy, MergeSpecification merge)
      throws IOException;

  public void testForceMergeNotNeeded() throws IOException {
    try (Directory dir = newDirectory()) {
      final AtomicBoolean mayMerge = new AtomicBoolean(true);
      final MergeScheduler mergeScheduler =
          new SerialMergeScheduler() {
            @Override
            public synchronized void merge(MergeSource mergeSource, MergeTrigger trigger)
                throws IOException {
              if (mayMerge.get() == false) {
                MergePolicy.OneMerge merge = mergeSource.getNextMerge();
                if (merge != null) {
                  System.out.println(
                      "TEST: we should not need any merging, yet merge policy returned merge "
                          + merge);
                  throw new AssertionError();
                }
              }

              super.merge(mergeSource, trigger);
            }
          };

      MergePolicy mp = mergePolicy();
      assumeFalse(
          "this test cannot tolerate random forceMerges",
          mp.toString().contains("MockRandomMergePolicy"));
      mp.setNoCFSRatio(random().nextBoolean() ? 0 : 1);

      IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
      iwc.setMergeScheduler(mergeScheduler);
      iwc.setMergePolicy(mp);

      IndexWriter writer = new IndexWriter(dir, iwc);
      final int numSegments = TestUtil.nextInt(random(), 2, 20);
      for (int i = 0; i < numSegments; ++i) {
        final int numDocs = TestUtil.nextInt(random(), 1, 5);
        for (int j = 0; j < numDocs; ++j) {
          writer.addDocument(new Document());
        }
        DirectoryReader.open(writer).close();
      }
      for (int i = 5; i >= 0; --i) {
        final int segmentCount = INDEX_WRITER_ACCESS.getSegmentCount(writer);
        final int maxNumSegments = i == 0 ? 1 : TestUtil.nextInt(random(), 1, 10);
        mayMerge.set(segmentCount > maxNumSegments);
        if (VERBOSE) {
          System.out.println(
              "TEST: now forceMerge(maxNumSegments="
                  + maxNumSegments
                  + ") vs segmentCount="
                  + segmentCount);
        }
        writer.forceMerge(maxNumSegments);
      }
      writer.close();
    }
  }

  public void testFindForcedDeletesMerges() throws IOException {
    MergePolicy mp = mergePolicy();
    if (mp instanceof FilterMergePolicy) {
      assumeFalse(
          "test doesn't work with MockRandomMP",
          ((FilterMergePolicy) mp).unwrap() instanceof MockRandomMergePolicy);
    }
    SegmentInfos infos = new SegmentInfos(Version.LATEST.major);
    try (Directory directory = newDirectory()) {
      MergePolicy.MergeContext context = new MockMergeContext(_ -> 0);
      int numSegs = random().nextInt(10);
      for (int i = 0; i < numSegs; i++) {
        SegmentInfo info =
            new SegmentInfo(
                directory, // dir
                Version.LATEST, // version
                Version.LATEST, // min version
                TestUtil.randomSimpleString(random()), // name
                random().nextInt(Integer.MAX_VALUE), // maxDoc
                random().nextBoolean(), // isCompoundFile
                false,
                null, // codec
                Collections.emptyMap(), // diagnostics
                TestUtil.randomSimpleString( // id
                        random(), StringHelper.ID_LENGTH, StringHelper.ID_LENGTH)
                    .getBytes(StandardCharsets.US_ASCII),
                Collections.emptyMap(), // attributes
                null /* indexSort */);
        info.setFiles(Collections.emptyList());
        infos.add(
            new SegmentCommitInfo(
                info, random().nextInt(1), 0, -1, -1, -1, StringHelper.randomId()));
      }
      MergePolicy.MergeSpecification forcedDeletesMerges =
          mp.findForcedDeletesMerges(infos, context);
      if (forcedDeletesMerges != null) {
        assertEquals(0, forcedDeletesMerges.merges.size());
      }
    }
  }

  /** Simple mock merge context for tests */
  public static final class MockMergeContext implements MergePolicy.MergeContext {
    private final ToIntFunction<SegmentCommitInfo> numDeletesFunc;
    private final InfoStream infoStream =
        new NullInfoStream() {
          @Override
          public boolean isEnabled(String component) {
            // otherwise tests that simulate merging may bottleneck on generating messages
            return false;
          }
        };

    private Set<SegmentCommitInfo> mergingSegments = Collections.emptySet();

    public MockMergeContext(ToIntFunction<SegmentCommitInfo> numDeletesFunc) {
      this.numDeletesFunc = numDeletesFunc;
    }

    @Override
    public int numDeletesToMerge(SegmentCommitInfo info) {
      return numDeletesFunc.applyAsInt(info);
    }

    @Override
    public int numDeletedDocs(SegmentCommitInfo info) {
      return numDeletesToMerge(info);
    }

    @Override
    public InfoStream getInfoStream() {
      return infoStream;
    }

    @Override
    public Set<SegmentCommitInfo> getMergingSegments() {
      return mergingSegments;
    }

    public void setMergingSegments(Set<SegmentCommitInfo> mergingSegments) {
      this.mergingSegments = mergingSegments;
    }
  }

  /**
   * Make a new {@link SegmentCommitInfo} with the given {@code maxDoc}, {@code numDeletedDocs} and
   * {@code sizeInBytes}, which are usually the numbers that merge policies care about.
   */
  protected static SegmentCommitInfo makeSegmentCommitInfo(
      String name, int maxDoc, int numDeletedDocs, double sizeMB, String source) {
    if (name.startsWith("_") == false) {
      throw new IllegalArgumentException("name must start with an _, got " + name);
    }
    byte[] id = new byte[StringHelper.ID_LENGTH];
    random().nextBytes(id);
    SegmentInfo info =
        new SegmentInfo(
            FAKE_DIRECTORY,
            Version.LATEST,
            Version.LATEST,
            name,
            maxDoc,
            false,
            false,
            TestUtil.getDefaultCodec(),
            Collections.emptyMap(),
            id,
            Collections.singletonMap(IndexWriter.SOURCE, source),
            null);
    info.setFiles(
        Collections.singleton(
            name + "_size=" + Long.toString((long) (sizeMB * 1024 * 1024)) + ".fake"));
    return new SegmentCommitInfo(info, numDeletedDocs, 0, 0, 0, 0, StringHelper.randomId());
  }

  /** A directory that computes the length of a file based on its name. */
  private static final Directory FAKE_DIRECTORY =
      new Directory() {

        @Override
        public String[] listAll() throws IOException {
          throw new UnsupportedOperationException();
        }

        @Override
        public void deleteFile(String name) throws IOException {
          throw new UnsupportedOperationException();
        }

        @Override
        public long fileLength(String name) throws IOException {
          if (name.endsWith(".liv")) {
            return 0L;
          }
          if (name.endsWith(".fake") == false) {
            throw new IllegalArgumentException(name);
          }
          int startIndex = name.indexOf("_size=") + "_size=".length();
          int endIndex = name.length() - ".fake".length();
          return Long.parseLong(name.substring(startIndex, endIndex));
        }

        @Override
        public IndexOutput createOutput(String name, IOContext context) throws IOException {
          throw new UnsupportedOperationException();
        }

        @Override
        public IndexOutput createTempOutput(String prefix, String suffix, IOContext context)
            throws IOException {
          throw new UnsupportedOperationException();
        }

        @Override
        public void sync(Collection<String> names) throws IOException {
          throw new UnsupportedOperationException();
        }

        @Override
        public void rename(String source, String dest) throws IOException {
          throw new UnsupportedOperationException();
        }

        @Override
        public void syncMetaData() throws IOException {
          throw new UnsupportedOperationException();
        }

        @Override
        public IndexInput openInput(String name, IOContext context) throws IOException {
          throw new UnsupportedOperationException();
        }

        @Override
        public Lock obtainLock(String name) throws IOException {
          throw new UnsupportedOperationException();
        }

        @Override
        public void close() throws IOException {
          throw new UnsupportedOperationException();
        }

        @Override
        public Set<String> getPendingDeletions() throws IOException {
          throw new UnsupportedOperationException();
        }
      };

  /**
   * Apply a merge to a {@link SegmentInfos} instance, accumulating the number of written bytes into
   * {@code stats}.
   */
  protected static SegmentInfos applyMerge(
      SegmentInfos infos, OneMerge merge, String mergedSegmentName, IOStats stats)
      throws IOException {

    int newMaxDoc = 0;
    double newSize = 0;
    for (SegmentCommitInfo sci : merge.segments) {
      int numLiveDocs = sci.info.maxDoc() - sci.getDelCount();
      newSize += (double) sci.sizeInBytes() * numLiveDocs / sci.info.maxDoc() / 1024 / 1024;
      newMaxDoc += numLiveDocs;
    }
    SegmentCommitInfo mergedInfo =
        makeSegmentCommitInfo(mergedSegmentName, newMaxDoc, 0, newSize, IndexWriter.SOURCE_MERGE);

    Set<SegmentCommitInfo> mergedAway = new HashSet<>(merge.segments);
    boolean mergedSegmentAdded = false;
    SegmentInfos newInfos = new SegmentInfos(Version.LATEST.major);
    for (int i = 0; i < infos.size(); ++i) {
      SegmentCommitInfo info = infos.info(i);
      if (mergedAway.contains(info)) {
        if (mergedSegmentAdded == false) {
          newInfos.add(mergedInfo);
          mergedSegmentAdded = true;
        }
      } else {
        newInfos.add(info);
      }
    }
    stats.mergeBytesWritten += newSize * 1024 * 1024;
    return newInfos;
  }

  /** Apply {@code numDeletes} uniformly across all segments of {@code infos}. */
  protected static SegmentInfos applyDeletes(SegmentInfos infos, int numDeletes) {
    List<SegmentCommitInfo> infoList = infos.asList();
    int totalNumDocs = infoList.stream().mapToInt(s -> s.info.maxDoc() - s.getDelCount()).sum();
    if (numDeletes > totalNumDocs) {
      throw new IllegalArgumentException("More deletes than documents");
    }
    double w = (double) numDeletes / totalNumDocs;
    List<SegmentCommitInfo> newInfoList = new ArrayList<>();
    for (int i = 0; i < infoList.size(); ++i) {
      assert numDeletes >= 0;
      SegmentCommitInfo sci = infoList.get(i);
      int segDeletes;
      if (i == infoList.size() - 1) {
        segDeletes = numDeletes;
      } else {
        segDeletes =
            Math.min(numDeletes, (int) Math.ceil(w * (sci.info.maxDoc() - sci.getDelCount())));
      }
      int newDelCount = sci.getDelCount() + segDeletes;
      assert newDelCount <= sci.info.maxDoc();
      if (newDelCount < sci.info.maxDoc()) { // drop fully deleted segments
        SegmentCommitInfo newInfo =
            new SegmentCommitInfo(
                sci.info,
                sci.getDelCount() + segDeletes,
                0,
                sci.getDelGen() + 1,
                sci.getFieldInfosGen(),
                sci.getDocValuesGen(),
                StringHelper.randomId());
        newInfoList.add(newInfo);
      }
      numDeletes -= segDeletes;
    }
    assert numDeletes == 0;
    SegmentInfos newInfos = new SegmentInfos(Version.LATEST.major);
    newInfos.addAll(newInfoList);
    return newInfos;
  }

  /**
   * Simulate an append-only use-case, ie. there are no deletes. TODO: incredibly slow (at least
   * with TestUpgradeIndexMergePolicy)
   */
  @Nightly
  public void testSimulateAppendOnly() throws IOException {
    doTestSimulateAppendOnly(mergePolicy(), 100_000_000, 10_000);
  }

  /**
   * Simulate an append-only use-case, ie. there are no deletes. {@code totalDocs} exist in the
   * index in the end, and flushes contribute at most {@code maxDocsPerFlush} documents.
   */
  @SuppressWarnings("UnnecessaryAsync")
  protected void doTestSimulateAppendOnly(
      MergePolicy mergePolicy, int totalDocs, int maxDocsPerFlush) throws IOException {
    IOStats stats = new IOStats();
    AtomicLong segNameGenerator = new AtomicLong();
    MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
    SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
    final double avgDocSizeMB = 5. / 1024; // 5kB
    for (int numDocs = 0; numDocs < totalDocs; ) {
      int flushDocCount = TestUtil.nextInt(random(), 1, maxDocsPerFlush);
      numDocs += flushDocCount;
      double flushSizeMB = flushDocCount * avgDocSizeMB;
      stats.flushBytesWritten += flushSizeMB * 1024 * 1024;
      segmentInfos.add(
          makeSegmentCommitInfo(
              "_" + segNameGenerator.getAndIncrement(),
              flushDocCount,
              0,
              flushSizeMB,
              IndexWriter.SOURCE_FLUSH));

      MergeSpecification merges =
          mergePolicy.findFullFlushMerges(MergeTrigger.SEGMENT_FLUSH, segmentInfos, mergeContext);
      if (merges == null) {
        merges = mergePolicy.findMerges(MergeTrigger.SEGMENT_FLUSH, segmentInfos, mergeContext);
      }
      while (merges != null) {
        assertTrue(merges.merges.size() > 0);
        assertMerge(mergePolicy, merges);
        for (OneMerge oneMerge : merges.merges) {
          segmentInfos =
              applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats);
        }
        merges = mergePolicy.findMerges(MergeTrigger.MERGE_FINISHED, segmentInfos, mergeContext);
      }
      assertSegmentInfos(mergePolicy, segmentInfos);
    }

    if (VERBOSE) {
      System.out.println(
          "Write amplification for append-only: "
              + (double) (stats.flushBytesWritten + stats.mergeBytesWritten)
                  / stats.flushBytesWritten);
    }
  }

  /** Simulate an update use-case where documents are uniformly updated across segments. */
  public void testSimulateUpdates() throws IOException {
    int numDocs = atLeast(1_000_000);
    doTestSimulateUpdates(mergePolicy(), numDocs, 2500);
  }

  /**
   * Simulate an update use-case where documents are uniformly updated across segments. {@code
   * totalDocs} exist in the index in the end, and flushes contribute at most {@code
   * maxDocsPerFlush} documents.
   */
  @SuppressWarnings("UnnecessaryAsync")
  protected void doTestSimulateUpdates(MergePolicy mergePolicy, int totalDocs, int maxDocsPerFlush)
      throws IOException {
    IOStats stats = new IOStats();
    AtomicLong segNameGenerator = new AtomicLong();
    MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
    SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
    final double avgDocSizeMB = 5. / 1024; // 5kB
    for (int numDocs = 0; numDocs < totalDocs; ) {
      final int flushDocCount;
      if (usually()) {
        // reasonable value
        flushDocCount = TestUtil.nextInt(random(), maxDocsPerFlush / 2, maxDocsPerFlush);
      } else {
        // crazy value
        flushDocCount = TestUtil.nextInt(random(), 1, maxDocsPerFlush);
      }
      // how many of these documents are actually updates
      int delCount = (int) (flushDocCount * 0.9 * numDocs / totalDocs);
      numDocs += flushDocCount - delCount;
      segmentInfos = applyDeletes(segmentInfos, delCount);
      double flushSize = flushDocCount * avgDocSizeMB;
      stats.flushBytesWritten += flushSize * 1024 * 1024;
      segmentInfos.add(
          makeSegmentCommitInfo(
              "_" + segNameGenerator.getAndIncrement(),
              flushDocCount,
              0,
              flushSize,
              IndexWriter.SOURCE_FLUSH));
      MergeSpecification merges =
          mergePolicy.findFullFlushMerges(MergeTrigger.SEGMENT_FLUSH, segmentInfos, mergeContext);
      if (merges == null) {
        merges = mergePolicy.findMerges(MergeTrigger.SEGMENT_FLUSH, segmentInfos, mergeContext);
      }
      while (merges != null) {
        assertMerge(mergePolicy, merges);
        for (OneMerge oneMerge : merges.merges) {
          segmentInfos =
              applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats);
        }
        merges = mergePolicy.findMerges(MergeTrigger.MERGE_FINISHED, segmentInfos, mergeContext);
      }
      assertSegmentInfos(mergePolicy, segmentInfos);
    }

    if (VERBOSE) {
      System.out.println(
          "Write amplification for update: "
              + (double) (stats.flushBytesWritten + stats.mergeBytesWritten)
                  / stats.flushBytesWritten);
      int totalDelCount =
          segmentInfos.asList().stream().mapToInt(SegmentCommitInfo::getDelCount).sum();
      int totalMaxDoc =
          segmentInfos.asList().stream().map(s -> s.info).mapToInt(SegmentInfo::maxDoc).sum();
      System.out.println("Final live ratio: " + (1 - (double) totalDelCount / totalMaxDoc));
    }
  }

  /** Statistics about bytes written to storage. */
  public static class IOStats {
    /** Bytes written through flushes. */
    long flushBytesWritten;

    /** Bytes written through merges. */
    long mergeBytesWritten;
  }

  @SuppressWarnings("UnnecessaryAsync")
  public void testNoPathologicalMerges() throws IOException {
    MergePolicy mergePolicy = mergePolicy();
    IOStats stats = new IOStats();
    AtomicLong segNameGenerator = new AtomicLong();
    MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
    SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
    // Both the docs per flush and doc size are small because these are the typical cases that used
    // to trigger pathological O(n^2) merging due to floor segment sizes
    final double avgDocSizeMB = 10. / 1024 / 1024;
    final int maxDocsPerFlush = 3;
    final int totalDocs = 10_000;
    int numFlushes = 0;
    for (int numDocs = 0; numDocs < totalDocs; ) {
      int flushDocCount = TestUtil.nextInt(random(), 1, maxDocsPerFlush);
      numDocs += flushDocCount;
      double flushSizeMB = flushDocCount * avgDocSizeMB;
      stats.flushBytesWritten += flushSizeMB * 1024 * 1024;
      segmentInfos.add(
          makeSegmentCommitInfo(
              "_" + segNameGenerator.getAndIncrement(),
              flushDocCount,
              0,
              flushSizeMB,
              IndexWriter.SOURCE_FLUSH));
      ++numFlushes;

      MergeSpecification merges =
          mergePolicy.findMerges(MergeTrigger.SEGMENT_FLUSH, segmentInfos, mergeContext);
      while (merges != null) {
        assertTrue(merges.merges.size() > 0);
        assertMerge(mergePolicy, merges);
        for (OneMerge oneMerge : merges.merges) {
          segmentInfos =
              applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats);
        }
        merges = mergePolicy.findMerges(MergeTrigger.MERGE_FINISHED, segmentInfos, mergeContext);
      }
      assertSegmentInfos(mergePolicy, segmentInfos);
    }

    final double writeAmplification =
        (double) (stats.flushBytesWritten + stats.mergeBytesWritten) / stats.flushBytesWritten;
    // Assuming a merge factor of 2, which is the value that triggers the most write amplification,
    // the total write amplification would be ~ log(numFlushes)/log(2). We allow merge policies to
    // have a write amplification up to log(numFlushes)/log(1.5). Greater values would indicate a
    // problem with the merge policy.
    final double maxAllowedWriteAmplification = Math.log(numFlushes) / Math.log(1.5);
    assertTrue(writeAmplification < maxAllowedWriteAmplification);
  }
}
